package com.lyon.demo.netty.client;

import cn.hutool.aop.ProxyUtil;
import cn.hutool.core.util.SerializeUtil;
import com.lyon.demo.common.spi.DefaultSpiLoader;
import com.lyon.demo.common.spi.annotation.LyonSpi;
import com.lyon.demo.common.spi.annotation.Signleton;
import com.lyon.demo.netty.server.SimpleRpcServiceHandlerRegistry;
import com.lyon.demo.rpc.api.core.*;
import com.lyon.demo.rpc.api.endpoint.RpcAccessEndpoint;
import com.lyon.demo.rpc.api.endpoint.Transport;
import com.lyon.demo.rpc.api.endpoint.TransportClient;
import com.lyon.demo.rpc.api.endpoint.TransportServer;
import com.lyon.demo.rpc.api.naming.NamingService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.rmi.server.ServerNotActiveException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * @author Lyon
 */
@Slf4j
@LyonSpi(value = CommonProtocol.NETTY)
@Signleton
public class NettyRpcAccessEndpoint implements RpcAccessEndpoint {

    /**
     * 命名服务、注册中心rpc
     */
    private SocketAddress socketAddress;
    private NamingService namingService;
    private Properties globalProp;
    private TransportClient transportClient;
    private String serviceName;
    private TransportServer transportServer;
    private RpcServiceHandlerRegistry registry = new SimpleRpcServiceHandlerRegistry();
    private int port;
    private static final List<String> ignoreMethod = List.of("toString", "getClass", "hashCode");

    @SneakyThrows
    @Override
    public <T> T getRemoteService(Class<T> interfaceClazz) {
        ServiceInstances serviceInstances = namingService.lookupService(interfaceClazz);
        if (!serviceInstances.active()) {
            throw new ServerNotActiveException();
        }
        // TODO: 2022/4/15
        final ServiceInstance serviceInstance = serviceInstances.getInstances().get(0);
        final SocketAddress remoteAddress = serviceInstance.getSocketAddress();

        // 创建 与服务端的连接
//        final Transport transport = transportClient.createTransport(remoteAddress, 3000);

        return ProxyUtil.newProxyInstance(new RpcInvocationHandler<>(interfaceClazz,remoteAddress), interfaceClazz);
    }

    @SneakyThrows
    @Override
    public <T> void registerProvider(Class<T> tClass, T obj) {
        registry.registerService(tClass, obj);
        namingService.registerInterface(tClass.getName(), serviceName);
    }

    @Override
    public Closeable startServer(Properties prop) throws Exception {
        this.globalProp = prop;
        final String remoteProtocol = prop.getProperty("remote-protocol");
        final String namingProtocol = prop.getProperty("naming-Protocol");
        final InetSocketAddress namingAddress = (InetSocketAddress) prop.get("naming-addr");
        this.serviceName = prop.getProperty("serviceName");
        // TODO: 2022/4/16
        this.port = (int) prop.get("port");
        // 获取命名服务
        this.namingService = DefaultSpiLoader.loader(NamingService.class, namingProtocol);
        namingService.start(namingAddress, prop);
        // 创建调用服务端
        this.transportServer = DefaultSpiLoader.loader(TransportServer.class, remoteProtocol);
        this.socketAddress = transportServer.start(registry, port);
        log.info("RpcServer端启动 transportServer..");
        // 创建调用客户端
        this.transportClient = DefaultSpiLoader.loader(TransportClient.class, remoteProtocol);
        log.info("RpcClient端启动 transportClient..");
        namingService.registerService(new ServiceInstance(serviceName, socketAddress, new Metadata()));
        log.info("RpcServer端-注册当前服务到注册中心 {}",serviceName);
        return this;
    }

    @Override
    public void close() throws IOException {
        ((Closeable) namingService).close();

        // TODO: 2022/4/16
    }

    class RpcInvocationHandler<T> implements InvocationHandler {

        private Class<T> interfaceClass;
        private SocketAddress socketAddress;

        public RpcInvocationHandler(Class<T> interfaceClass, SocketAddress socketAddress) {
            this.socketAddress = socketAddress;
            this.interfaceClass = interfaceClass;
        }

        @Override
        public Object invoke(Object o, Method method, Object[] objects) throws Throwable {
            final String name = method.getName();
            if (ignoreMethod.contains(name)) {
                return method.invoke(o, objects);
            }
            String interfaceName = interfaceClass.getName();
            String methodName = method.getName();
            final Class<?>[] types = method.getParameterTypes();
            // TODO: 2022/4/16 自定义序列化？ 提高性能
            byte[] args = objects == null ? null : SerializeUtil.serialize(objects);
            final byte[] payload = SerializeUtil.serialize(new RpcRequest(interfaceName, methodName,types, args));
            final Command command = new Command(Types.ServerTypes.exe_method, Versions.V_1, payload);
            final Transport transport = transportClient.createTransport(socketAddress, 3000L);
            final CompletableFuture<Command> future = transport.send(command);
            final Command response = future.get(60, TimeUnit.SECONDS);
            final Result result = SerializeUtil.deserialize(response.getPayload());
            Result.checkSuccess(result);
            return result.getData();
        }
    }

}
